/*
 *  Copyright (C) 2004 Cidero, Inc.
 *
 *  Permission is hereby granted to any person obtaining a copy of 
 *  this software to use, copy, modify, merge, publish, and distribute
 *  the software for any non-commercial purpose, subject to the
 *  following conditions:
 *  
 *  The above copyright notice and this permission notice shall be included
 *  in all copies or substantial portions of the Software.
 *
 *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 
 *  OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 *  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 
 *  THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 *  LIABILITY IN CONNECTION WITH THE SOFTWARE.
 * 
 *  File: $RCSfile: SyncShoutcastGroup.java,v $
 *
 */

package com.cidero.bridge;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Vector;
import java.util.logging.Logger;

import com.cidero.http.HTTPConnection;
import com.cidero.util.ByteBufferQueue;
import com.cidero.util.ShoutcastOutputStream;


/**
 * Describe class <code>SyncShoutcastGroup</code> here.
 *
 * Experimental class to see if it's possible to sync up multiple HTTP-get
 * shoutcast sessions by outputting the data from a single thread.
 *
 * When a new client joins the shoutcast 'station', enough data is sent
 * to prime the new clients input buffers. Then the data transfer is 
 * halted for all clients so that they will all drain their buffers
 * and be in more or less the same state (assuming the same physical 
 * device). Then data output is resumed.
 *
 * Probability of success?  25% maybe ?
 *
 * @author <a href="mailto:newell@mediarush.com"></a>
 * @version 1.0
 */
public class SyncShoutcastGroup
{
  private static Logger logger = Logger.getLogger("com.cidero.bridge");

  MediaRendererSession parentSession;

  // Use vector for stream list since it is synchronized
  Vector  streamInfoList = new Vector();   
  boolean running = false;

  class StreamInfo
  {
    ShoutcastOutputStream stream;
    HTTPConnection connection;

    public StreamInfo( ShoutcastOutputStream stream,
                       HTTPConnection connection )
    {
      this.stream = stream;
      this.connection = connection;
    }

    public ShoutcastOutputStream getStream() {
      return stream;
    }

    public HTTPConnection getConnection() {
      return connection;
    }
  }
  
  public SyncShoutcastGroup( MediaRendererSession parentSession,
                             ShoutcastOutputStream stream,
                             HTTPConnection connection )
  {
    this.parentSession = parentSession;

    StreamInfo streamInfo = new StreamInfo( stream, connection );
    streamInfoList.add( streamInfo );
  }
  
  /*
   *  Add a new stream to the synchronzed group. This function may be
   *  invoked by other threads
   */
  public synchronized void addStream( ShoutcastOutputStream stream,
                                      HTTPConnection connection )
  {
    logger.fine(" ADDING NEW STREAM TO SHOUTCAST GROUP !!!!!!!!\n");
    StreamInfo streamInfo = new StreamInfo( stream, connection );
    streamInfoList.add( streamInfo );
  }
  
  public void stop()
  {
    running = false;
  }
  
  /**
   *  Run a synchronous shoutcast session. This normally runs in the
   *  context of the HTTP-GET request thread of the 1st requester in the
   *  group.
   *
   */ 
  public void run()
  {
    running = true;
    
    //
    //  Attach to queue with streaming data.
    //
    ByteBuffer qBuf;
    ByteBufferQueue queue = parentSession.getQueue();
    byte[] buf = new byte[65536];
    long totalBytes = 0;
    long lastTotal = 0;
    StreamInfo streamInfo = (StreamInfo)streamInfoList.get(0);
    ShoutcastOutputStream stream = streamInfo.getStream();

    int bytes;

    String streamTitle = "StreamTitle='Starting Total Bytes 0';";
    String lastStreamTitle = null;

    logger.fine(" SyncShoutGroup: starting xfers ");

    while( running )
    {
      try 
      {
        while( (qBuf = queue.get( 5000 )) == null )
        {
          logger.warning("SyncShoutcastGroup: 5-sec timeout reading queue " );
          if( !running ) 
            return;
        }
        
        // qBuf.limit() contains the number of bytes actually received
        bytes = qBuf.limit();
        qBuf.get( buf, 0, bytes );  // copy to local buf
      }
      catch( IOException e )
      {
        logger.info("SyncShoutcastGroup: Queue EOF");
        running = false;
        break;
      }
      
      totalBytes += bytes;

      // Simulated shoutcast metadata experimentation
      /*
      if( (totalBytes - lastTotal) > 65536 )
        streamTitle = "StreamTitle='Total Bytes - " + totalBytes + "';";
      */
        
      try 
      {
        // Experimenting with sending data in 1024-byte chunks to improve
        // sync resolution (probably not necessary)
        //for( int m = 0 ; m < 4 ; m++ )
        for( int n = 0 ; n < streamInfoList.size() ; n++ )
        {
          streamInfo = (StreamInfo)streamInfoList.get(n);
          stream = streamInfo.getStream();
          
          //
          // Write packet out to HTTP client.
          // 
          //stream.write( buf, m*1024, 1024 );
          //stream.flush();
          stream.write( buf, 0, bytes );  // the old way - 4096 chunks
          
          // Test code to inserting running byte count as shoutcast metadata
          // (Useful as general test case to make sure renderer is interpreting
          // data properly
          /*
          if( (totalBytes - lastTotal) > 65536 )
          {
            stream.setMetadata( streamTitle );
          }
          */
        }

      }
      catch( IOException e )
      {
        logger.fine("Exception writing to socket " );

        // remove the offending (presumably closed) stream
        streamInfoList.remove( streamInfo );
        if( streamInfoList.size() == 0 )
          running = false;
      }

      if( (totalBytes - lastTotal) > 65536*2 )
      {
        logger.fine("SyncShoutcast Server: totalBytes = " + totalBytes );
        lastTotal = totalBytes;
      }
              
      //System.out.println("Proxy server - freeing packet");
      queue.free( qBuf );

    } // while ( running )
    
    //
    // Done - close off the connections
    //
    logger.fine("SyncShoutcastGroup: Done - closing connections" );
    close();

  }
  
  public void close()
  {
    for( int n = 0 ; n < streamInfoList.size() ; n++ )
    {
      StreamInfo streamInfo;

      streamInfo = (StreamInfo)streamInfoList.get(n);

      HTTPConnection connection = streamInfo.getConnection();
      logger.fine("SyncShoutcastGroup: closing connection" );
      connection.close();
    }

  }

  String lastMetadata = "dummy";
  //long lastMetadataTime = 0;
  
  public void processMetadata( String metadata )
  {
    StreamInfo streamInfo;
    ShoutcastOutputStream stream;

    //AppPreferences pref = RadioServer.getPreferences();
    //long currTime = System.currentTimeMillis();
    logger.fine("processing metadata: " + metadata );

    if( metadata.equals( lastMetadata ) )
      return;
    
    lastMetadata = metadata;
    //lastMetadataTime = currTime;
    
    for( int n = 0 ; n < streamInfoList.size() ; n++ )
    {
      streamInfo = (StreamInfo)streamInfoList.get(n);
      stream = streamInfo.getStream();

      // Pass metadata along to device via output stream. It is inserted
      // every 8192 bytes using the proxy's icy-metaint value of 8192.
      // This may be different than the incoming frequency!
      stream.setMetadata( metadata );
    }
  }

}


/*
      try 
      {
        while( (qBuf = queue.get( 5000 )) == null )
        {
          System.out.println("HTTPServer: Timeout reading queue ");
          if( !running ) 
            return;
        }
        
        // qBuf.limit() contains the number of bytes actually received
        bytes = qBuf.limit();
        qBuf.get( buf, 0, bytes );  // copy to local buf
      }
      catch( IOException e )
      {
        System.out.println("HTTPServer: Queue EOF!");
        break;
      }
      
      //System.out.println("Proxy server - got packet of length " +
      //                           qBuf.limit() );
        
      totalBytes += bytes;
      if( (totalBytes - lastTotal) > 65536 )
      {
        streamTitle = "StreamTitle='Total Bytes - " + totalBytes + "';";
      }
        
      try 
      {
        //
        // Check to see if new stream was added since last xmit - 
        // if so, output to new device for 64k (4 sec), then pause a 
        // while to starve all devices equally (sync), then restart
        //
        if( newStream )
        {
          logger.fine("New client connected - syncing...");

          newStreamBytes += bytes;
          
          if( newStreamBytes > 128*1024 )  // Try 128K for AudioTron
          {
            // Wait a bit for data to drain from all devices
            logger.fine("  waiting 30 seconds for sync... \n");

            try { Thread.sleep(30000); } catch( InterruptedException e ) { }

            logger.fine("  restarting data flow");

            resetNewStream();  // synchronized method

            for( int n = 0 ; n < streamInfoList.size() ; n++ )
            {
              streamInfo = (StreamInfo)streamInfoList.get(n);
              streamInfo.enableDelay();
            }
          }
        }

        for( int n = 0 ; n < streamInfoList.size() ; n++ )
        {
          streamInfo = (StreamInfo)streamInfoList.get(n);

          stream = streamInfo.getStream();
          
          //
          // Write packet out to HTTP client. Note - this call can block
          // for a little longer than one might expect, since the socket
          // connection (under Linux at least) seems to require a bunch
          // of free space (48k ?) in the network buffering layer before
          // allowing this task to wake up and write 4096 bytes
          // 

          int sendBytes = streamInfo.getBytesToSend( bytes );

          if( sendBytes > 0 )
          {
            stream.write( buf, bytes-sendBytes, sendBytes );
          }
          else
          {
            logger.fine("sendBytes = 0 for stream " + n + 
                        " delay = " + streamInfo.getBufferingDelayBytes() );
          }
          
          if( (totalBytes - lastTotal) > 65536 )
          {
            stream.setMetadata( streamTitle );
          }
        }
      }
      catch( IOException e )
      {
        logger.fine("Exception writing to socket " );

        // remove the offending (presumably closed) stream
        streamInfoList.remove( streamInfo );
      }
      
      if( (totalBytes - lastTotal) > 65536*2 )
      {
        logger.fine("HTTPServer: totalBytes = " + totalBytes );
        lastTotal = totalBytes;
      }
        
      //System.out.println("Proxy server - freeing packet");
      queue.free( qBuf );

    }
    
    // Done - close off the connections
    for( int n = 0 ; n < streamInfoList.size() ; n++ )
    {
      streamInfo = (StreamInfo)streamInfoList.get(n);
      HTTPConnection connection = streamInfo.getConnection();
      connection.close();
    }

    logger.fine("HTTPServer: shutting down session" );

*/
